[AWS IoT Core] MQTT v5 を使用してメッセージ及び、セッション有効期限とクリーンスタートを実装して見ました
1 はじめに
CX 事業本部のデリバリー部の平内(SIN)です。
AWS re:Invent 2022 で発表された、AWS IoT Core での MQTT v5 対応に反応して、色々入門しています。
今回は、メッセージ及び、セッション有効期限とクリーンスタートについての実装を試して見ました。
初めに、作成したサンプルが動作している様子を紹介させてください。
左のコンソールが Publisher、右のコンソールが Subscriber です。 Publisher は、パラメータに有効期間(秒)を指定してメッセージを送信します。 メッセージブローカーは、有効期間(秒)だけ、メッセージを保持するので、 期間内に Subscriber が立ち上がってくれば、そのメッセージを取得できます。
1 回目は、有効期間 3 秒で Publish し、3 秒以上経過してから Subscriber が上がったので、メッセージが取得できなかった状況です。 そして、2 回目、3 回目は、どちらも、期間内に上がったので、メッセージが取得できている様子を観測できています。
なお、Subscriber は、オフライン中に、ブローカーに到着しているメッセージを「有効」なものとするため、クリーンスタート(セッション初期化)しないで起動しています。
※ 現時点(2022/12/02)では、AWS で提供される SDK は、MQTT v5 に対応していないため、サンプル作成には、paho.mqtt を使用させて頂きました。
2 セッション、メッセージの有効期間及び、クリーンスタート
MQTT v5 では、個々の Publish メッセージに有効期限を設定できます。そして、その保存に影響するのが、セッションの有効期限とクリーンスタートです。
3つの機能を組み合わせることで、多彩なセッション管理とメッセージの運用が可能になります。
(1) メッセージの有効期限 (Message expiry feature)
PUBLISH 時にメッセージに有効期限(秒単位で指定)を設定すると、その期間に応じて、メッセージブローカーでメッセージは保持され、また自動的に削除されます。
これは、サブスクライバーが接続中かそうでないかに関係なく動作します。したがって、何らかの都合でサブスクライバーが切断中であっても、必要なメッセージが欠落する心配がなくなります。
メッセージの有効期限は、4 バイトの整数で設定できますが、「設定なし」の場合、メッセージは期限切れになりません。(無期限となる)
これは、MQTT v3.1 でも利用可能な Retained message によく似ていますが、Retained message は、トピック単位で1つのメッセージで利用するものなので、応用範囲は大きく異なるでしょう。
なお、PUBLISH メッセージには、 retain=true オプションも同時に設定可能です。
(2) セッションの有効期限 (Session expiry feature)
CONNECT 時にセッションの有効期限(秒単位で指定)を使用すると、セッションの固定間隔を定義できます。
有効期限が 0 に設定されているか、CONNECT パケットに有効期限の値が含まれていない場合、クライアントの接続が閉じた時点で、直ちにセッション情報がメッセージブローカー上から削除されます。
セッションの有効期限は、4 バイトの整数で設定できますが、AWS IoT Core では、時間単位の精度で最大が 7 日間となっています。
有効期限が経過すると、特定クライアントのセッション情報は自動的に切断され、保持されていたメッセージも破棄されます。
MQTT v3 | MQTT v5 |
---|---|
cleanSession=false | sessionExpiry >0 , cleanStart=flase |
cleanSession=true | sessionExpiry 0 , cleanStart=true |
(3) クリーン スタート (Slean start flag)
クリーンスタートは、セッションの有効期限と連携して設定できるフラグです。このフラグを有効(true)にすると既存のセッションを使用せず、新しいセッションが開始されます。
参考:https://docs.aws.amazon.com/iot/latest/developerguide/mqtt.html#mqtt5
3 利用例
MQTT v3.1 では、セッションは、削除するか、永続するかの選択しかありませんでしたが、期限が明示的に設定できるようになったことで、デバイスの破棄や廃止、テスト環境の撤収などがやりやすくなったと思います。
個々のメッセージに有効期限を設定できるようになった事で、仕様に応じたメッセージの運用が可能になっています。例えは、「現時点の緊急アラート」を示すメッセージと、「ファームウエアの要更新」メッセージでは、仕様上の有効期間が違ってくると思います。MQTT v3.1 では、このような要求を、サブスクライバー側で制御したり、パブリッシャー側の再送などで実装していたと思います。
また、一時的にオフラインとなっているデバイス宛のメッセージを、ブローカーで正確に保持できる仕組みは、非常に多くのデバイス(サブスクライバー)が接続されたシステムで、シンプルなシステム構築が可能になりそうです。
4 実装例
以下が、実装したコードです。
sub.py は、サブスクライバーです。セッションの有効期限は 600 秒で、clean_start=False となっているので、一度起動すると、終了しても、5分以内に起動すれば、セッションは継続されていることになります。
pub.pyは、パブリッシャーです。パラメーターで有効期限を指定してメッセージを送信します。
% python3 pub.py 120 (有効期限、120秒間のメッセージを送信する)
sub.py
import ssl import time import os import paho.mqtt.client as mqtt from paho.mqtt.properties import Properties from paho.mqtt.packettypes import PacketTypes endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com" port = 8883 dir = os.path.dirname(os.path.abspath(__file__)) certs = { "cafile": "{}/certificates/AmazonRootCA1.pem".format(dir), "certfile": "{}/certificates/client-cert.pem".format(dir), "keyfile": "{}/certificates/private-key.pem".format(dir), } def on_message(client, userdata, message): msg=str(message.payload.decode("utf-8")) print('on_message topic:{} {} {}'.format(message.topic, msg, message.properties)) def on_connect(client, user_data, flags, reason_code, properties=None): client.subscribe('sensor/#', qos=1) def main(): session_expiry_interval = 600 client = mqtt.Client("server_id", protocol = mqtt.MQTTv5) client.tls_set(certs["cafile"], certfile=certs["certfile"], keyfile=certs["keyfile"], cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None) client.on_message = on_message client.on_connect = on_connect properties_connect = Properties(PacketTypes.CONNECT) properties_connect.SessionExpiryInterval = session_expiry_interval client.connect(endpoint, port, clean_start=False, properties=properties_connect) client.loop_start() time.sleep(5) if __name__ == "__main__": main()
pub.py
import ssl import sys import os import time import paho.mqtt.client as mqtt from paho.mqtt.properties import Properties from paho.mqtt.packettypes import PacketTypes endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com" port = 8883 args = sys.argv if 2 != len(args): print("use: client messageExpiryInterval(sec)") exit() messageExpiryInterval = int(args[1]) dir = os.path.dirname(os.path.abspath(__file__)) certs = { "cafile": "{}/certificates/AmazonRootCA1.pem".format(dir), "certfile": "{}/certificates/client-cert.pem".format(dir), "keyfile": "{}/certificates/private-key.pem".format(dir), } def on_connect(client, user_data, flags, reason_code, properties=None): properties = Properties(PacketTypes.PUBLISH) properties.MessageExpiryInterval = messageExpiryInterval topic = "sensor/device1" payload = "MESSAGE" client.publish(topic, payload, qos=1, properties=properties) def main(): client = mqtt.Client("client_id", protocol = mqtt.MQTTv5) client.tls_set(certs["cafile"], certfile=certs["certfile"], keyfile=certs["keyfile"], cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None) client.on_connect = on_connect client.connect(endpoint, port, properties = None) client.loop_start() time.sleep(1) if __name__ == "__main__": main()
5 最後に
今回は、メッセージ及び、セッション有効期限とクリーンスタートについて確認してみました。
この3つの機能を組み合わせることで、メッセージの保持(キャッシュ)の作業をメッセージブローカーに任せることが可能になるため、個々のメッセージ仕様の応じた動作を、非常にシンプルに実装できそうです。
個人的には、メッセージの送信前後に関係なく、有効期限により運用できるあたりが、非常に興味深いと感じています。
引き続き、MQTT v5 で利用可能になった機能について、確認を進めたいと思います。
6 参考リンク
Introducing new MQTTv5 features for AWS IoT Core to help build flexible architecture patterns
[AWS IoT Core] MQTT v5 を使用してリクエスト・レスポンス パターンを実装して見ました
[AWS IoT Core] MQTT v5 を使用してユーザープロパティを実装して見ました
[AWS IoT Core] MQTT v5 を使用してトピック・エイリアスを実装して見ました